package com.uxsino.simo.collector.connections;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import javax.management.MBeanServerConnection;
import javax.management.MBeanServerInvocationHandler;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;

import org.apache.activemq.broker.jmx.BrokerViewMBean;
import org.apache.activemq.broker.jmx.DestinationViewMBean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.uxsino.simo.connections.AbstractConnection;
import com.uxsino.simo.connections.target.ActiveMQTarget;

public class ActiveMQConnection extends AbstractConnection<ActiveMQTarget> {

    private static Logger logger = LoggerFactory.getLogger(ActiveMQConnection.class);

    private JMXConnector jmxc;

    private MBeanServerConnection mbeanServerConn;

    private String baseObjNameStr = "org.apache.activemq:brokerName=";

    private String brokerName;

    private BrokerViewMBean mBean;

    @Override
    public int connect(ActiveMQTarget target) {
        super.connect(target);
        logger.info("start connecting to ActiveMQ mbeanserver");
        this.target = target;
        connected = false;
        state = 0;
        try {
            String url = "service:jmx:rmi:///jndi/rmi://" + target.host + ":" + target.port + "/jmxrmi";
            JMXServiceURL urls = new JMXServiceURL(url);
            Map<String, String[]> environment = new HashMap<String, String[]>();
            String[] credentials = new String[] { target.getUsername(), target.getPassword() };
            environment.put("jmx.remote.credentials", credentials);
            jmxc = JMXConnectorFactory.connect(urls, environment);
            mbeanServerConn = jmxc.getMBeanServerConnection();
            brokerName = target.getBrokerName();
            ObjectName name = new ObjectName(baseObjNameStr + brokerName + ",type=Broker");
            mBean = (BrokerViewMBean) MBeanServerInvocationHandler.newProxyInstance(mbeanServerConn, name,
                BrokerViewMBean.class, true);
            logger.info("connected to mbeanserver");
            connected = true;
        } catch (IOException e) {
            connected = false;
            logger.error(" connection Exception: {}", e.getMessage());
            return state;
        } catch (MalformedObjectNameException e) {
            connected = false;
            logger.error("exception of new ObjectName ", e.getMessage());
            return state;
        }
        state = 1;
        return state;
    }

    @Override
    public Object execCmd(Object cmdPattern) {
        String cmd = (String) cmdPattern;
        return getInfo(cmd);
    }

    @Override
    public Object buildCmd(String cmdPattern, Map<String, String> args) {
        return cmdPattern;
    }

    @Override
    public int close() {

        if (mbeanServerConn != null) {
            try {
                jmxc.close();
            } catch (IOException e) {
                logger.error("error close jmx connector: ", e);
                connected = false;
            }
            mbeanServerConn = null;
            connected = false;
        }
        super.close();
        return 0;
    }

    public Object getInfo(String cmd) {
        if (cmd == null) {
            logger.error("cmd is null");
            return null;
        }
        JSONObject result = new JSONObject();
        JSONArray jsonArr = new JSONArray();

        if (cmd.equals("GeneralInfo")) {
            result = getGeneralInfo();
            return result;
        } else if (cmd.equals("quereInfo")) {
            ObjectName[] queues = mBean.getQueues();
            jsonArr = getInfo4QueueAndTopic(queues);
            return jsonArr;
        } else if (cmd.equals("topicInfo")) {
            ObjectName[] topics = mBean.getTopics();
            jsonArr = getInfo4QueueAndTopic(topics);
            return jsonArr;
        } else if (cmd.equals("subscribersInfo")) {

        } else if (cmd.equals("connectionInfo")) {

        }
        return "";
    }

    private JSONObject getGeneralInfo() {
        JSONObject jobj = new JSONObject();
        jobj.put("brokerId", mBean.getBrokerId());
        jobj.put("brokerName", mBean.getBrokerName());
        jobj.put("brokerVersion", mBean.getBrokerVersion());
        jobj.put("totalEnqueueCount", mBean.getTotalEnqueueCount());
        jobj.put("totalDequeueCount", mBean.getTotalDequeueCount());
        jobj.put("totalConsumerCount", mBean.getTotalConsumerCount());
        jobj.put("totalMessageCount", mBean.getTotalMessageCount());
        jobj.put("memoryPercentUsage", mBean.getMemoryPercentUsage());
        jobj.put("storePercentUsage", mBean.getStorePercentUsage());
        jobj.put("memoryLimit", mBean.getMemoryLimit());
        return jobj;
    }

    private JSONArray getInfo4QueueAndTopic(ObjectName[] ObjectNames) {
        JSONArray jArr = new JSONArray();

        for (ObjectName na : ObjectNames) {
            JSONObject jobj = new JSONObject();
            DestinationViewMBean queueBean = (DestinationViewMBean) MBeanServerInvocationHandler
                .newProxyInstance(mbeanServerConn, na, DestinationViewMBean.class, true);
            jobj.put("name", queueBean.getName());
            jobj.put("queueSize", queueBean.getQueueSize());
            jobj.put("dispatchCount", queueBean.getDispatchCount());
            jobj.put("consumerCount", queueBean.getConsumerCount());
            jobj.put("producerCount", queueBean.getProducerCount());
            jobj.put("dequeueCount", queueBean.getDequeueCount());
            jobj.put("expiredCount", queueBean.getExpiredCount());
            jobj.put("memoryPercentUsage", queueBean.getMemoryPercentUsage());
            jobj.put("memoryLimit", queueBean.getMemoryLimit());
            jobj.put("queueSize", queueBean.getQueueSize());
            jobj.put("averageEnqueueTime", queueBean.getAverageEnqueueTime());
            jobj.put("maxEnqueueTime", queueBean.getMaxEnqueueTime());
            jobj.put("minEnqueueTime", queueBean.getMinEnqueueTime());
            jArr.add(jobj);
        }

        return jArr;
    }

    public JSONObject getSubscribersInfo(JSONObject result) {
        JSONObject jsonObject = new JSONObject();
        return jsonObject;
    }

    public JSONObject getConnectionInfo(JSONObject result) {
        JSONObject jsonObject = new JSONObject();
        return jsonObject;
    }

    @Override
    public boolean testWithConnected(String cmdString, String resStart) {
        try {
            if (mBean == null) {
                logger.error("连接测试失败, BrokerViewMBean  mBean 为null：{}", target);
                connected = false;
                return false;
            }
            String brokerName = mBean.getBrokerName();
            if (brokerName != null && brokerName.equals(target.getBrokerName())) {
                connected = true;
            }
        } catch (Exception e) {
            connected = false;
            logger.error("error executing cmd in activemq connection", e);
        }
        logger.info("activemq connection test result: {}", connected);
        return connected;
    }

    @Override
    public boolean connectAndTest(ActiveMQTarget target, String cmdString, String resStart) {
        boolean result = false;
        try {
            connect(target);
            result = testWithConnected(cmdString, resStart);
        }catch (Exception e){
            logger.warn("close connection error : {}", e);
        }finally {
            close();
        }
        return result;

    }

}
